{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### View spark session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.sql.session.SparkSession at 0x7f041c01f1d0>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# WordCount Program"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Create an RDD for data stored in HDFS. Change the path to where you have data.\n",
    "poems = sc.textFile(\"/user/mapr/poems\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "8"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Find number of records in poems RDD\n",
    "poems.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'Warm summer sun,'"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Find first record in the RDD\n",
    "poems.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['Warm summer sun,',\n",
       " 'Shine kindly here,',\n",
       " 'Warm southern wind,',\n",
       " 'Blow softly here.',\n",
       " 'Green sod above,']"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "poems.take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['Warm summer sun,',\n",
       " 'Shine kindly here,',\n",
       " 'Warm southern wind,',\n",
       " 'Blow softly here.',\n",
       " 'Green sod above,',\n",
       " 'Lie light, lie light.',\n",
       " 'Good night, dear heart,',\n",
       " 'Good night, good night.']"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "poems.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('here.', 1),\n",
       " ('good', 3),\n",
       " ('lie', 2),\n",
       " ('green', 1),\n",
       " ('light,', 1),\n",
       " ('above,', 1),\n",
       " ('here,', 1),\n",
       " ('summer', 1),\n",
       " ('night.', 1),\n",
       " ('night,', 2),\n",
       " ('shine', 1),\n",
       " ('blow', 1),\n",
       " ('wind,', 1),\n",
       " ('southern', 1),\n",
       " ('sun,', 1),\n",
       " ('kindly', 1),\n",
       " ('dear', 1),\n",
       " ('heart,', 1),\n",
       " ('softly', 1),\n",
       " ('warm', 2),\n",
       " ('sod', 1),\n",
       " ('light.', 1)]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "poems.flatMap(lambda line: str.lower(line).split(r\" \"))\\\n",
    ".map(lambda word: (word, 1))\\\n",
    ".groupByKey()\\\n",
    ".mapValues(lambda values: sum(values))\\\n",
    ".collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exercise: \n",
    "Find out average trading volume per stock in 2016 using stocks.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "stocks = sc.textFile(\"/user/mapr/stocks\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1857093"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1857093"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks.cache().count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "date,open,high,low,close,volume,adjclose,symbol\n",
      "2000-07-17,95.4375,97.5,92.75,96.625,3508100.0,74.269199,XLNX\n",
      "2000-07-17,22.625,22.75,22.4375,22.5625,201600.0,13.48614,ES\n",
      "2000-07-17,6.750002,6.937503,6.375,6.5,1235700.0,5.241649,CHK\n",
      "2000-07-17,19.812501,20.1875,19.500001,20.1875,1434100.0,3.806147,NI\n",
      "2000-07-17,30.5,30.6875,30.0,30.03125,254600.0,19.81183,SNA\n",
      "2000-07-17,44.749996,45.062498,44.500004,45.000009,535200.0,17.400773,FOXA\n",
      "2000-07-17,19.625,19.625,19.25,19.375,309500.0,13.768835,R\n",
      "2000-07-17,16.6562,16.6875,16.125,16.25,5507200.0,1.755466,ROST\n",
      "2000-07-17,56.25,57.25,56.0625,56.125,7941200.0,18.31076,PG\n"
     ]
    }
   ],
   "source": [
    "for line in stocks.take(10):\n",
    "    print(line)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2016-01-04,46.119999,46.130001,45.360001,45.799999,3472200.0,44.870315,XLNX\n",
      "2016-01-04,50.650002,50.889999,50.23,50.880001,1590300.0,50.053021,ES\n",
      "2016-01-04,4.44,4.97,4.4,4.95,3.84761E7,4.95,CHK\n",
      "2016-01-04,19.41,19.66,19.309999,19.52,2592700.0,19.121168,NI\n",
      "2016-01-04,168.789993,169.5,166.5,168.529999,413800.0,167.208406,SNA\n",
      "2016-01-04,86.279999,87.269997,85.550003,87.18,2657300.0,85.403348,LYB\n",
      "2016-01-04,17.559999,17.639999,17.379999,17.620001,5887800.0,17.325726,WU\n",
      "2016-01-04,26.719999,26.76,26.299999,26.59,1.19141E7,26.446528,FOXA\n",
      "2016-01-04,67.029999,69.269997,64.639999,68.769997,4249800.0,67.626307,WYNN\n",
      "2016-01-04,55.950001,57.200001,55.25,56.900002,1119900.0,56.144751,R\n"
     ]
    }
   ],
   "source": [
    "stocks2016 = stocks.filter(lambda line: line.startswith(\"2016\"))\n",
    "for line in stocks2016.take(10):\n",
    "    print(line)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "('XLNX', 3472200.0)"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stocks_paired = stocks2016\\\n",
    ".map(lambda line:line.split(\",\"))\\\n",
    ".map(lambda tokens: (tokens[7], float(tokens[5])))\\\n",
    "\n",
    "stocks_paired.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('BAC', 109953689.74358974)\n",
      "('FCX', 47979558.333333336)\n",
      "('CHK', 41622735.256410256)\n",
      "('AAPL', 40944183.974358976)\n",
      "('GE', 37751663.461538464)\n",
      "('F', 37432197.43589743)\n",
      "('PFE', 35777183.974358976)\n",
      "('MSFT', 34194448.07692308)\n",
      "('FB', 28902566.025641024)\n",
      "('MU', 27260807.692307692)\n"
     ]
    }
   ],
   "source": [
    "avg_vol_by_stock = stocks_paired\\\n",
    ".groupByKey()\\\n",
    ".mapValues(lambda values: sum(values)/ len(values))\\\n",
    ".sortBy(lambda pair: pair[1], False)\n",
    "\n",
    "for pair in avg_vol_by_stock.take(10):\n",
    "    print(pair)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "avg_vol_by_stock.saveAsTextFile(\"/user/mapr/stocks_avg\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Found 3 items\n",
      "-rwxr-xr-x   1 mapr mapr          0 2017-08-08 04:14 /user/mapr/stocks_avg/_SUCCESS\n",
      "-rwxr-xr-x   1 mapr mapr       8712 2017-08-08 04:14 /user/mapr/stocks_avg/part-00000\n",
      "-rwxr-xr-x   1 mapr mapr       4976 2017-08-08 04:14 /user/mapr/stocks_avg/part-00001\n"
     ]
    }
   ],
   "source": [
    "!hadoop fs -ls /user/mapr/stocks_avg"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('BAC', 109953689.74358974)\r\n",
      "('FCX', 47979558.333333336)\r\n",
      "('CHK', 41622735.256410256)\r\n",
      "('AAPL', 40944183.974358976)\r\n",
      "('GE', 37751663.461538464)\r\n",
      "('F', 37432197.43589743)\r\n",
      "('PFE', 35777183.974358976)\r\n",
      "('MSFT', 34194448.07692308)\r\n",
      "('FB', 28902566.025641024)\r\n",
      "('MU', 27260807.692307692)\r\n",
      "cat: Unable to write to output stream.\r\n",
      "cat: Unable to write to output stream.\r\n"
     ]
    }
   ],
   "source": [
    "!hadoop fs -cat /user/mapr/stocks_avg/part* | head"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# MovieLens Join of Data Using RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "data_home = \"/user/mapr/movielens\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('1', 'Toy Story (1995)')\n",
      "('2', 'Jumanji (1995)')\n",
      "('3', 'Grumpier Old Men (1995)')\n",
      "('4', 'Waiting to Exhale (1995)')\n",
      "('5', 'Father of the Bride Part II (1995)')\n",
      "('6', 'Heat (1995)')\n",
      "('7', 'Sabrina (1995)')\n",
      "('8', 'Tom and Huck (1995)')\n",
      "('9', 'Sudden Death (1995)')\n",
      "('10', 'GoldenEye (1995)')\n"
     ]
    }
   ],
   "source": [
    "movies = sc\\\n",
    ".textFile(data_home + \"/movies\")\\\n",
    ".filter(lambda line: not line.startswith(\"movieId\"))\\\n",
    ".map(lambda line: line.split(\",\"))\\\n",
    ".map(lambda tokens: (tokens[0], tokens[1]))\n",
    "\n",
    "for line in movies.take(10):\n",
    "    print(line)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('16', 4.0)\n",
      "('24', 1.5)\n",
      "('32', 4.0)\n",
      "('47', 4.0)\n",
      "('50', 4.0)\n",
      "('110', 4.0)\n",
      "('150', 3.0)\n",
      "('161', 4.0)\n",
      "('165', 3.0)\n",
      "('204', 0.5)\n"
     ]
    }
   ],
   "source": [
    "ratings = sc\\\n",
    ".textFile(data_home + \"/ratings\")\\\n",
    ".filter(lambda line: not line.startswith(\"userId\"))\\\n",
    ".map(lambda line: line.split(\",\"))\\\n",
    ".map(lambda tokens: (tokens[1], float(tokens[2])))\n",
    "\n",
    "for line in ratings.take(10):\n",
    "    print(line)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('2250', 2.1666666666666665)\n",
      "('5951', 4.5)\n",
      "('6558', 2.6666666666666665)\n",
      "('66798', 3.1)\n",
      "('110501', 4.3)\n",
      "('4833', 1.0)\n",
      "('2265', 1.5)\n",
      "('27704', 2.0)\n",
      "('3665', 1.0)\n",
      "('4652', 2.0)\n"
     ]
    }
   ],
   "source": [
    "ratings_avg = ratings.groupByKey().mapValues(lambda values: sum(values) / len(values))\n",
    "\n",
    "for p in ratings_avg.take(10):\n",
    "    print(p)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('53318', (4.0, 'Cashback (2006)'))\n",
      "('3793', (3.5576923076923075, 'X-Men (2000)'))\n",
      "('5460', (2.125, '\"Powerpuff Girls'))\n",
      "('25796', (3.0, '\"Ghoul'))\n",
      "('142074', (3.0, 'Knock Knock (2015)'))\n",
      "('4986', (0.5, 'Silent Rage (1982)'))\n",
      "('87660', (4.5, 'Too Big to Fail (2011)'))\n",
      "('26489', (1.5, 'Strange Invaders (1983)'))\n",
      "('27704', (2.0, 'Battle Royale 2: Requiem (Batoru rowaiaru II: Chinkonka) (2003)'))\n",
      "('59026', (4.0, '99 francs (2007)'))\n"
     ]
    }
   ],
   "source": [
    "for p in ratings_avg.join(movies).take(10):\n",
    "    print(p)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('Cashback (2006)', 4.0)\n",
      "('X-Men (2000)', 3.5576923076923075)\n",
      "('\"Powerpuff Girls', 2.125)\n",
      "('\"Ghoul', 3.0)\n",
      "('Knock Knock (2015)', 3.0)\n",
      "('Silent Rage (1982)', 0.5)\n",
      "('Too Big to Fail (2011)', 4.5)\n",
      "('Strange Invaders (1983)', 1.5)\n",
      "('Battle Royale 2: Requiem (Batoru rowaiaru II: Chinkonka) (2003)', 2.0)\n",
      "('99 francs (2007)', 4.0)\n"
     ]
    }
   ],
   "source": [
    "movie_avg_title = ratings_avg.join(movies).map(lambda p: (p[1][1], p[1][0]))\n",
    "for p in movie_avg_title.take(10):\n",
    "    print(p)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.4.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
